package com.mango.iot.gateway.tcp.server;

import com.mango.iot.gateway.tcp.connection.ConnectionManager;
import com.mango.iot.gateway.tcp.handler.ProcessRunnable;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

/**
 * 物联网TCP网关
 *
 * @author liangfeihu
 * @since 2021/3/26 下午1:41
 */
@Slf4j
@Component
public class NettyTcpServer {

    @Value("${gateway.tcp.port}")
    private int port = 9099;

    /**
     * 逻辑处理线程
     */
    public static ProcessRunnable processRunnable = new ProcessRunnable();

    private static final int BIZ_GROUP_SIZE = 1;
    private static final int BIZ_THREAD_SIZE = Runtime.getRuntime().availableProcessors() * 2;

    private final EventLoopGroup bossGroup = new NioEventLoopGroup(BIZ_GROUP_SIZE);
    private final EventLoopGroup workerGroup = new NioEventLoopGroup(BIZ_THREAD_SIZE);

    /**
     * Tcp网关启动入口
     *
     * @throws Exception
     */
    @PostConstruct
    public void init() throws Exception {
        log.info("start tcp server ...");
        Class clazz = NioServerSocketChannel.class;
        // Server 服务启动
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(bossGroup, workerGroup);
        bootstrap.channel(clazz);

        // 设置请求处理器
        bootstrap.childHandler(new ServerChannelInitializer());
        bootstrap.option(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT)
                .childOption(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT);

        // 绑定接口，同步等待成功
        log.info("start tcp server at port[" + port + "].");
        // 绑定服务器等待直到绑定完成，调用sync()方法会阻塞直到服务器完成绑定,
        ChannelFuture future = bootstrap.bind(port).sync();
        ChannelFuture channelFuture = future.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    log.info("Server have success bind to " + port + " !");
                } else {
                    log.error("Server fail bind to " + port);
                    throw new Exception("Server start fail !", future.cause());
                }
            }
        });

        // 启动工作线程
        Thread processThread = new Thread(processRunnable);
        log.info("业务处理器设置为daemon");
        processThread.setDaemon(true);
        log.info("启动业务处理器进程");
        processThread.start();
    }

    /**
     * 关闭Tcp网关
     */
    @PreDestroy
    public void shutdown() {
        log.info("shutdown tcp server ...");
        // 释放线程池资源
        try {
            ConnectionManager.getInstance().closeAllConn();
        } catch (Exception e) {
            log.error("[TcpServer]shutdown close all connection error=", e);
        }
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
        log.info("shutdown tcp server end.");
    }

}
